Skip to content

Conversation

timgrein
Copy link
Contributor

@timgrein timgrein commented Oct 8, 2025

With this PR I've added a new request queue to the inference API, which executes requests, which are not ingest embeddings requests (e.g. embeddings generation on search path, rerank etc.) immediately. Otherwise the requests are simply submitted to the rate limited execution path as before. This requests queue is executed in a separate thread, which blocks until a new request comes available avoiding explicit polling. This removes an additional worst-case latency of xpack.inference.http.request_executor.task_poll_frequency (default: 50ms), which we observed when generating sparse text embeddings using EIS.

I've verified the improved latency by running ES (with & without the changes introduced by this PR) and EIS locally and executing requests using vegeta.

Without optimization:
image

With optimization:
image

@timgrein timgrein added >bug :ml Machine learning Team:ML Meta label for the ML team v9.2.0 v9.2.1 labels Oct 8, 2025
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/ml-core (Team:ML)

@elasticsearchmachine
Copy link
Collaborator

Hi @timgrein, I've created a changelog YAML for you.

@jaybcee
Copy link
Member

jaybcee commented Oct 8, 2025

@timgrein Great work! Would you be able to similarly generate a graphic against a local EIS directly such that we understand if any overhead exists? I understand the logic behind the PR, but I'm curious what overhead exists still.

Copy link
Contributor

@jonathan-buttner jonathan-buttner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work! Left a few suggestions.

public void shutdown() {
if (shutdown.compareAndSet(false, true)) {
if (requestQueueTask != null) {
boolean cancelled = FutureUtils.cancel(requestQueueTask);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I remember correctly, I think it's up to our implementation to check if it is canceled. So I think we'll get stuck in the queue.take() 🤔

It doesn't seem like FutureUtils.cancel() will do an interrupt.

This is how we've handled that in the past: https://github.com/elastic/elasticsearch/blob/8.13/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java#L253

The shutdown() method puts a noop task on the queue to ensure that it wakes up.

Can you double check the tests and make sure we're covering this case (we call shutdown and then await termination)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adjusted with Add NoopTask to wake up queue on shutdown

Can you double check the tests and make sure we're covering this case (we call shutdown and then await termination)?

AFAIU we always check that when calling submitShutdownRequest, right?

logger.debug("Inference request queue interrupted, exiting");
} catch (Exception e) {
logger.warn("Error processing task in inference request queue", e);
cleanup();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we move this to a finally block to ensure it gets called.

Copy link
Contributor Author

@timgrein timgrein Oct 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

startHandlingRateLimitedTasks();
} catch (Exception e) {
logger.warn("Failed to start request executor", e);
cleanup();
Copy link
Contributor

@jonathan-buttner jonathan-buttner Oct 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's a small potential for an edge case here (if we go the noop task route to do a shutdown for the queue.take()). If an exception occurs in startHandlingRateLimitedTasks(), it could cause the requestQueue to be drained which could mean that it'd never get the noop task.

I'd have to think of a good way to solve that. Maybe we split up the cleanup methods so that this one doesn't drain the requestQueue, instead the processRequestQueue() would call a different cleanup() that'd handle doing that 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try {
while (isShutdown() == false) {
// Blocks the request queue thread until a new request comes in
var task = (RequestTask) requestQueue.take();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need the cast?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to replace uses of RequestTask with RejectableTask in this class, since there's no reason that the interface can't be used throughout.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

var task = (RequestTask) requestQueue.take();

if (isShutdown()) {
logger.debug("Shutdown requested while handling request tasks, cleaning up");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

@timgrein timgrein Oct 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


if (isShutdown()) {
logger.debug("Shutdown requested while handling request tasks, cleaning up");
cleanup();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if we move this to a finally block we probably don't need it here and we could probably remove the return.

});

endpoint.enqueue(task);
if (taskAccepted == false) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the task was accepted we'll want to check is shutdown one last time to ensure we notify this task that we're shutting down.

Here's an example: https://github.com/elastic/elasticsearch/blob/8.13/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.java#L302

endpoint.enqueue(task);
}

private boolean rateLimitingEnabled(RequestManager requestManager) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For consistency how about we make this static and accept a RateLimitSettings object. Then we can use it in executeEnqueuedTaskInternal which has a similar check. Technically executeEnqueuedTaskInternal should never receive a non-rate limited task, but it'd probably be good to check just in case.

Copy link
Contributor Author

@timgrein timgrein Oct 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! Adjusted with Reuse rateLimitSettingsEnabled check

@timgrein
Copy link
Contributor Author

timgrein commented Oct 8, 2025

@timgrein Great work! Would you be able to similarly generate a graphic against a local EIS directly such that we understand if any overhead exists? I understand the logic behind the PR, but I'm curious what overhead exists still.

Do you mean on my local machine with "local EIS"? If so, ES and EIS ran on my local machine for the results you see above

Comment on lines 383 to 410
// Reject non-rate-limited requests
List<RejectableTask> requests = new ArrayList<>();
requestQueue.drainTo(requests);

for (var request : requests) {
rejectRequest(request);
}
}

private void rejectRequest(RejectableTask task) {
try {
task.onRejection(
new EsRejectedExecutionException(
format(
"Failed to send request for inference id [%s] has shutdown prior to executing request",
task.getRequestManager().inferenceEntityId()
),
true
)
);
} catch (Exception e) {
logger.warn(
format(
"Failed to notify request for inference id [%s] of rejection after executor service shutdown",
task.getRequestManager().inferenceEntityId()
)
);
}
Copy link
Contributor

@DonalEvans DonalEvans Oct 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is duplicated in RateLimitingEndpointHandler, with the only difference being the format of the error message. Would it be possible to extract it to a static method that both places could call? Rather than needing to use the id field directly for the service grouping, it can be derived from the task: Integer.toString(task.getRequestManager().rateLimitGrouping().hashCode())

There are also quite a few other places where we reject tasks due to shutdown that could be extracted to a method call to reduce code duplication and ensure consistency for the error messages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the id (inferenceEntityId being the name of the inference endpoint AFAIU) and service grouping are two different things, so IMO we should keep id in the error messages, so it's clear, which endpoint failed to execute a request. I think the hashCode of the rateLimitGrouping could become quite cryptic for a clear log/error message.

But nothing speaks against extracting the common part to a static method 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the confusion, the id field in RateLimitingEndpointHandler is the service grouping hash code. The field is not well named in that respect.

thrownException.getMessage(),
is(
Strings.format(
"Failed to send request, request service [3355] for inference id [id] has shutdown prior to executing request",
Copy link
Contributor

@DonalEvans DonalEvans Oct 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be service [%s] for inference

Copy link
Contributor Author

@timgrein timgrein Oct 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it actually is: error message formatting. AFAIUhashCode is simply executed on the inferenceEntityId leading to 3355 in this case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I mean is that you're using String.format() to construct the error message and passing requestManager.rateLimitGrouping().hashCode() as the second argument to it, but the string in the first argument doesn't have any placeholders for that value to be used. Right now the test is passing because the rate limit grouping is a constant from run to run, but if it ever changes, then the hard-coded hash code of 3355 will no longer be correct and the test will fail.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah got it now, adjusted with Use string placeholder in assertion

Comment on lines 401 to 403
service.submitTaskToRateLimitedExecutionPath(
new RequestTask(requestManager, new EmbeddingsInput(List.of(), InputTypeTests.randomWithNull()), null, threadPool, listener)
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change necessary? The test still passes if execute() is called instead of submitTaskToRateLimitedExecutionPath().

Also, while I know you didn't add it in this PR, this test's name is at odds with what it's actually asserting. The name implies that we don't expect an exception when polling the queue to cause the service to terminate, but the test explicitly terminates the service as part of throwing the exception, then asserts that it's terminated. Is this test actually testing anything other than that calling shutdown() causes the service to shut down?

Copy link
Contributor Author

@timgrein timgrein Oct 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adjusted the test to use execute again.

Is this test actually testing anything other than that calling shutdown() causes the service to shut down?

Good point, I've adjusted the test to assert that the service still runs after task.execute(...) threw an exception. I've used task.execute(...) instead of queue.poll(), because poll() is also used internally in AdjustableCapacityBlockingQueue when calling requestQueue.take() in processRequestQueue, which does in fact terminate the service and made the test green for the wrong reason. Commit: Adjust test to check that a throwing task does not terminate the service. (I'll add a similar test for the non-rate-limited execution path)

Speaking of that: executeTaskImmediately inside processRequestQueue handles any Exceptions thrown by the processed request task without terminating the service. AFAIU take() shouldn't throw except when it's calling thread is interrupted, which we need to handle explicitly anyway. I've adjusted the error message in the general Exception handler with Adjust error message in general exception handler to reflect that an exception here is not coming from a task or a task rejection, but is potentially a more fundamental issue leading to service termination. Just wanted to double-check, if my reasoning is correct here and we want this behavior?

endpointHandler.init();
return endpointHandler;
});
boolean taskAccepted = requestQueue.offer(task);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, prior to this change, when execute() was called, we would retrieve (or create) a RateLimitingEndpointHandler appropriate to the task's rate limit settings, then add the task to the queue associated with that handler. This meant that there was one queue per RateLimitingEndpointHandler, each with a capacity defined by the RequestExecutorServiceSettings, and that we wouldn't reject a task unless the queue for the handler associated with it was full.

With the new change, all tasks are first submitted to the single requestQueue queue, which has the same capacity as each of the queues managed by the handlers, meaning that we can begin rejecting tasks even though none of the queues associated with the handlers are full, effectively reducing the total number of requests we can process in a given time.

Would it be better to instead only add tasks with no rate limit settings to the request queue, and call submitTaskToRateLimitedExecutionPath() in the execute() method for tasks with rate limit settings? That way, we're not adding rate limited tasks to one queue just to later remove them and add them to another queue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a very valid point @DonalEvans! I'll adjust the implementation, thanks for flagging

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


rejectRequest(
task,
format("Failed to send request for inference id [%s] has shutdown prior to executing request", inferenceEntityId),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This message might be better as "Failed to send request for inference id [%s] because the request executor service has been shutdown" to make it consistent with the error we report in execute() if we try to queue a task when we're shut down.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

task,
format(
"Failed to send request, request service [%s] for inference id [%s] has shutdown prior to executing request",
id,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not strictly related to this PR, but since you're making changes in this class, could you rename the id field on RateLimitingEndpointHandler to be something more descriptive, like rateLimitGroupingId?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

requestManager.rateLimitGrouping().hashCode()
)
)
is("Failed to execute task for inference id [id] because the request service [3355] queue is full")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be using Strings.format() with [3355] replaced with [%s] and the other argument being requestManager.rateLimitGrouping().hashCode().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines 741 to 742
@SuppressWarnings("unchecked")
var stubbing = when(mockExecutorService.submit(any(Runnable.class))).thenReturn(mock(Future.class));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This suppression and unused variable can be avoided using the thenAnswer() method, which I think is a little cleaner:

when(mockExecutorService.submit(any(Runnable.class))).thenAnswer(i -> mock(Future.class));

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

if (rateLimitingEnabled(requestManager.rateLimitSettings())) {
if (isEmbeddingsIngestInput(inferenceInputs)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rateLimitingEnabled check should be retained

if (isEmbeddingsIngestInput(inferenceInputs) || rateLimitingEnabled(requestManager.rateLimitSettings())) {

}
}

private static final RejectableTask NoopTask = new RejectableTask() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: When I first saw the comparison I thought this was a class name. How about we make this all caps or noopTask.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

backport pending >bug :ml Machine learning Team:ML Meta label for the ML team v9.2.1 v9.3.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants